package com.zyx.flink.realtime.app;

import com.alibaba.fastjson.JSONObject;
import com.zyx.flink.realtime.utils.MyKafkaUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Random;

/**
 * @author zyx
 * @since 2021/6/11 19:27
 * desc: KafkaProducer生成Json字符串案例
 */
public class JsonKafkaProducer {
    public static void main(String[] args) {

        KafkaProducer<String, String> kafkaProducer = MyKafkaUtil.getKafkaProdocuer();

        Random random = new Random();
        String[] ids = {"1001", "1002", "1003", "1004", "1005", "1006"};
        int cycle = 2;

        for (int i = 0; i < cycle; i++) {
            for (String id : ids) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("id", id);
                jsonObject.put("age", 10 + random.nextInt(20));
                jsonObject.put("name", "noname");
                String jsonStr = jsonObject.toJSONString();
                ProducerRecord<String, String> record = new ProducerRecord<>("user_age", jsonStr);
                kafkaProducer.send(record);
                System.out.println("producer >> " + jsonStr);
            }
        }

        kafkaProducer.close();
    }
}
